Optimized Moving Median Filter¶

Using Min/Max Heap with Static Memory Allocation¶

Kostadin Bajalcaliev, Index: 239038

Problem¶

In ECG signal processing, the first step is to remove noise, such as high-frequency powerline interference (50/60 Hz) and low-frequency baseline wander. While band-pass filtering is a common solution, and Butterworth filters are frequently preferred, they require forward-backward filtering to eliminate phase distortion. This not only doubles the computational load but also introduces critical latency in real-time applications.

Discussion¶

Baseline wander is typically handled using high-pass filters with cutoff frequencies ranging from 0.5 Hz to 1 Hz. Caused by factors such as respiration, electrode motion, and patient movement, it is non-stationary and lacks a predictable frequency profile. This artifact can significantly degrade ECG signal quality, which particularly impacts the detection and delineation of low-amplitude, low-frequency features like the P-wave and T-wave. Accurate detection of these features is dependent on a precise estimation of the isoelectric line.

An alternative approach is the use of median filters. These non-linear filters replace each sample with the median value within a sliding window. This approach is less sensitive to outliers and preserves the shape of the ECG waveform, without introducing phase distortion, making them suitable for real-time applications. However, the frequency response of median filters is complex and not as well-defined as that of linear filters. This makes it challenging to predict their effect on the ECG signal, especially in patients with a complex interplay of arrhythmias or other abnormalities.

The use of longer window sizes can improve the characteristics of the median filter, making it less likely to treat exceptionally wide or distorted QRS morphologies as baseline wander. Combining a median filter with a low-pass Lynn filter results in more effective baseline wander removal. This hybrid approach achieves a frequency response similar to that of the Lynn filter alone while leveraging the advantages of the median filter. However, this comes at the cost of increased computational complexity and memory usage.

The naive sort-based implementation of a median filter has a time complexity of O(N log N) per sample, which is problematic for real-time applications, especially with longer window sizes. Using min/max heaps can reduce this to O(log N) per sample. However, the sliding window requires the removal of outgoing samples, an operation not natively supported by heaps. To address this, a lazy deletion strategy can be employed, where samples are marked for deletion and only removed when they reach the top of the heap. This strategy introduces additional memory overhead due to dynamic memory allocation. In the worst case, some samples may remain in the heap for extended periods before being deleted, or even indefinitely.

Implementation¶

Use of static memory allocation is highly desirable for real-time and embedded applications, pre-allocating memory for the maximum number of samples that can be in the heaps at any given time. Removal of outgoing samples now requires searching the heaps and rebalancing them, before inserting the incoming sample. As the median is always at the root of one of the heaps, it only required to search one of the heaps, half the window size, resulting in time complexity O(N log N). The element to be removed is more likely to be towards the bottom of the heap, experiments showed that searching in reverse order results in faster average search. Compared to the naive sort-based implementation, this approach reduces the computing time up to 30%-40% for window sizes in the range of 500-600 ms. Additional benefit is that min/max heap approach can elegantly handle dymamic window sizes, computing the median of measurments in a fixed time window instead of a fixed number of successive samples covering varying time spans.

Static memory allocation is highly desirable for real-time and embedded applications. This involves pre-allocating memory for the maximum number of samples the heaps can contain. To process a new sample, the outgoing sample must first be located and removed from its heap, requiring a search and rebalancing operation, before the incoming sample is inserted. Since the median is always at the root of one of the heaps, only one heap needs to be searched. This results in a worst-case time complexity of O(N). Empirical testing showed that searching the heap in reverse order (from the last leaf node upwards) yields a faster average search time, as the element to be removed is statistically more likely to be located near the bottom.

Compared to the naive sort-based implementation, this heap-based approach with static allocation reduces computing time by 30% to 40% for window sizes corresponding to 500-600 ms at 250 Hz. An additional benefit is that the min/max heap approach can elegantly handle dynamic window sizes, calculating the median over a fixed time window rather than a fixed number of successive samples, which can cover varying time spans.

Figures¶

The provided strips display two minutes of signal from the MIT-BIH record 101, sampled at 250 Hz and contaminated with both high-frequency noise and baseline wander. A third-order Butterworth band-pass filter (0.5-25 Hz) serves as the reference.

  • Top (Black): The raw signal. The green line represents the isoelectric line estimated by the Median+Lynn filter approach.
  • Middle (Dark Blue): The signal after baseline wander removal, followed by adaptive low-pass filtering.
  • Bottom (Red): The signal processed with the Butterworth band-pass filter, applied in a zero-phase manner using forward and backward passes on 1000 ms segments with a 500 ms overlap.
  • Last (Brown): The signal processed with a forward-pass-only Butterworth band-pass filter, clearly demonstrating the effects of phase distortion.

The use of any linear low-pass filter can affect the amplitudes of QRS complexes, particularly when the cutoff frequency is near their dominant spectral components. This amplitude attenuation can lead to an underestimation of QRS amplitude and alter the relative amplitudes of the P and T waves, potentially impacting clinical interpretation. To mitigate this, the middle (dark blue) signal is a result of an adaptive filtering technique designed to preserve the high-frequency QRS complexes.

Conceptually, this technique combines a primary Savitzky–Golay filter with a secondary low-pass Lynn filter. The resulting filtered signals are merged using a Kaiser–Bessel-derived window as a weighting function, centered on the QRS complexes. The detailed description of this method is beyond the scope of this document.

In [6]:
import dbx
%config InlineBackend.figure_format = 'svg'
dbx.Plot("MITDB", "101", pages = [2,3,4,5])
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

CPU Profiling¶

Profiling data indicates that the Median Filter accounts for significant CPU usage,
highlighting the importance of optimizing this specific component in the processing pipeline.

image.png

Naive Sort Approach¶

In [ ]:
/// sorted.go

package asp

type SortMedian struct {
	Delay  int
	Length int
	Strict bool

	Sum    int
	Last   int
	Value  int
	First  int
	Pivot  int
	Index  int
	Ready  bool
	Buffer []int
	Sorted []int
}

func NewSortMedian(length int) *SortMedian {
	obj := &SortMedian{
		Delay:  length / 2,
		Length: length,
		Strict: true,
		Buffer: make([]int, length),
		Sorted: make([]int, length),
	}
	for i := 0; i < length; i++ {
		obj.Buffer[i] = 0
		obj.Sorted[i] = 0
	}
	return obj
}

func (obj *SortMedian) Reset() {
	obj.Sum = 0
	obj.Last = 0
	obj.Value = 0
	obj.First = 0
	obj.Pivot = 0
	obj.Index = 0
	obj.Ready = false
	for i := 0; i < obj.Length; i++ {
		obj.Buffer[i] = 0
		obj.Sorted[i] = 0
	}
}

func (obj *SortMedian) Filter(datum int) int {
	if !obj.Ready && obj.Index == 0 {
		for i := 0; i < obj.Length; i++ {
			obj.Buffer[i] = datum
			obj.Sorted[i] = datum
			obj.Sum += datum
		}
	}

	var rem = obj.Buffer[obj.Index]
	obj.Buffer[obj.Index] = datum
	obj.Index++
	if obj.Index == obj.Length {
		obj.Index = 0
		obj.Ready = true
	}
	obj.First = obj.Buffer[obj.Index]

	obj.Sum += datum - rem
	obj.Last = datum

	var half = obj.Index + obj.Delay
	if half >= obj.Length {
		half -= obj.Length
	}
	obj.Pivot = obj.Buffer[half]

	var ix = 0
	for i := 0; i < obj.Length; i++ {
		if obj.Sorted[i] == rem {
			ix = i
			break
		}
	}

	obj.Sorted[ix] = datum
	for i := ix; i < obj.Length-1; i++ {
		if obj.Sorted[i] > obj.Sorted[i+1] {
			obj.Sorted[i], obj.Sorted[i+1] = obj.Sorted[i+1], obj.Sorted[i]
		} else {
			break
		}
	}
	for i := ix; i > 0; i-- {
		if obj.Sorted[i] < obj.Sorted[i-1] {
			obj.Sorted[i], obj.Sorted[i-1] = obj.Sorted[i-1], obj.Sorted[i]
		} else {
			break
		}
	}

	var m = obj.Delay
	if obj.Strict {
		obj.Value = obj.Sorted[m]
	} else {

		obj.Value = obj.Sorted[m-1] + obj.Sorted[m] + obj.Sorted[m+1]/3
	}

	return obj.Value
}

Mix/Max Heap Approach¶

In [ ]:
/// heap.go

package asp

// Fixed-capacity heap.
type Heap struct {
	Cap int   // maximum number of elements
	Min bool  // max-heap if false, min-heap if true
	Len int   // current number of elements
	Buf []int // backing array, length = capacity
}

func NewHeap(capy int, mini bool) *Heap {
	if capy <= 0 {
		panic("Heap capacity must be > 0")
	}
	return &Heap{
		Cap: capy,
		Min: mini,
		Len: 0,
		Buf: make([]int, capy),
	}
}

func (h *Heap) Reset() {
	h.Len = 0
	for i := range h.Buf {
		h.Buf[i] = 0
	}
}

func (h *Heap) Top() int {
	if h.Len == 0 {
		panic("Heap.Top on empty heap")
	}
	return h.Buf[0]
}

func (h *Heap) Push(x int) {
	if h.Len == len(h.Buf) {
		panic("Heap overflow")
	}
	h.Buf[h.Len] = x
	h.up(h.Len)
	h.Len++
}

func (h *Heap) Pop() int {
	if h.Len == 0 {
		panic("Heap.Pop on empty heap")
	}
	top := h.Buf[0]
	h.Len--
	h.swap(0, h.Len)
	h.down(0)
	return top
}

func (h *Heap) cmp(i, j int) bool {
	ai, aj := h.Buf[i], h.Buf[j]
	if h.Min {
		return ai < aj
	} else {
		return ai > aj
	}
}

func (h *Heap) swap(i, j int) {
	h.Buf[i], h.Buf[j] = h.Buf[j], h.Buf[i]
}

func (h *Heap) up(i int) {
	for {
		if i == 0 {
			return
		}
		p := (i - 1) >> 1
		if !h.cmp(i, p) {
			return
		}
		h.swap(i, p)
		i = p
	}
}

func (h *Heap) down(i int) {
	for {
		l := i*2 + 1
		if l >= h.Len {
			return
		}
		j := l
		r := l + 1
		if r < h.Len && h.cmp(r, l) {
			j = r
		}
		if !h.cmp(j, i) {
			return
		}
		h.swap(i, j)
		i = j
	}
}

// Remove the deepest occurrence of val from the heap.
func (h *Heap) remove(val int) bool {
	for i := h.Len - 1; i >= 0; i-- {
		if h.Buf[i] == val {
			_ = h.removeAt(i)
			return true
		}
	}
	return false
}

// Remove the element at index i from the heap.
func (h *Heap) removeAt(i int) int {
	if i == 0 {
		return h.Pop()
	}
	if i >= h.Len {
		panic("invalid index")
	}
	r := h.Buf[i]
	h.Len--
	h.swap(i, h.Len)
	// Try both up and down since the swapped element
	// may violate heap property in either direction.
	h.up(i)
	h.down(i)
	return r
}
In [ ]:
/// median.go

package asp

type HeapMedian struct {
	Length int
	Delay  int

	Index int
	Ready bool
	First int
	Pivot int

	Value int

	Data  []int
	Small *Heap // lower half (extra element for odd)
	Large *Heap // upper half
}

func NewHeapMedian(length int) *HeapMedian {
	if length <= 0 {
		panic("window size must be positive")
	}
	var obj = &HeapMedian{
		Length: length,
		Delay:  length / 2,
		Value:  0,
		Data:   make([]int, length),
		Small:  NewHeap(length/2+2, false),
		Large:  NewHeap(length/2+2, true),
	}
	return obj
}

func (obj *HeapMedian) Reset() {
	obj.Index = 0
	obj.Ready = false
	obj.First = 0
	obj.Pivot = 0

	obj.Value = 0
	for i := 0; i < len(obj.Data); i++ {
		obj.Data[i] = 0
	}
	obj.Small.Reset()
	obj.Large.Reset()
}

func (obj *HeapMedian) Filter(x int) int {
	if obj.Ready {
		obj.Pop(obj.Index)
	}
	obj.Push(obj.Index, x)

	obj.Index++
	if obj.Index == obj.Length {
		obj.Ready = true
		obj.Index = 0
	}
	obj.First = obj.Data[obj.Index]

	var half = obj.Index + obj.Delay
	if half >= obj.Length {
		half -= obj.Length
	}
	obj.Pivot = obj.Data[half]

	var val int
	if obj.Small.Len > obj.Large.Len {
		val = obj.Small.Top()
	} else if obj.Small.Len == obj.Large.Len {
		val = (obj.Small.Top() + obj.Large.Top()) / 2
	} else {
		val = obj.Large.Top()
	}
	obj.Value = val
	return val
}

func (obj *HeapMedian) Pop(head int) {
	var val = obj.Data[head]
	obj.remove(val)
	obj.rebalance()
}

func (obj *HeapMedian) Push(tail int, x int) {
	obj.Data[tail] = x
	if obj.Small.Len == 0 || (obj.Small.Len > 0 && x <= obj.Small.Top()) {
		obj.Small.Push(x)
	} else {
		obj.Large.Push(x)
	}
	obj.rebalance()
}

func (obj *HeapMedian) targetSizes() (wantS, wantL int) {
	var count = obj.Small.Len + obj.Large.Len
	if count == 0 {
		return 0, 0
	}
	if count&1 == 1 {
		return (count + 1) / 2, count / 2
	} else {
		return count / 2, count / 2
	}
}

func (obj *HeapMedian) rebalance() {
	wantS, _ := obj.targetSizes()
	for obj.Small.Len > wantS {
		v := obj.Small.Pop()
		obj.Large.Push(v)
	}
	for obj.Large.Len > 0 && obj.Small.Len < wantS {
		v := obj.Large.Pop()
		obj.Small.Push(v)
	}
}

func (obj *HeapMedian) remove(val int) {
	if val <= obj.Small.Top() {
		if obj.Small.remove(val) {
			return
		}
	}
	if val >= obj.Large.Top() {
		if obj.Large.remove(val) {
			return
		}
	}
}

Test Code¶

In [ ]:
/// cmd/test.go

package main

import (
	"finki/asp"
	"fmt"
	"math/rand"
	"time"
)

func TestMedianRandom() {
	f := asp.NewSortMedian(11)
	m := asp.NewHeapMedian(11)

	rng := rand.New(rand.NewSource(42))
	for i := 0; i < 1*3600*250; i++ {
		v := i + rng.Intn(1000)
		a := f.Filter(v)
		b := m.Filter(v)
		if m.Ready && a != b {
			println(">>> MISMATCH", i, v, a, b)
			panic("Median mismatch")
		}
	}
	fmt.Println("Median random test passed")
}

func TestMedianPerf() {
	WIN := (600 / 2) | 1
	LEN := 10 * 3600 * 250

	DATA := make([]int, LEN)
	{
		rng := rand.New(rand.NewSource(42))
		for i := 0; i < LEN; i++ {
			DATA[i] = rng.Intn(1000)
		}
	}

	var srt time.Duration
	{
		med := asp.NewSortMedian(WIN)
		start := time.Now()
		for i := 0; i < LEN; i++ {
			v := DATA[i]
			med.Filter(v)
		}
		srt = time.Since(start)
		fmt.Println("SortMedian took:", srt.Microseconds())
	}

	var hep time.Duration
	{
		med := asp.NewHeapMedian(WIN)
		start := time.Now()
		for i := 0; i < LEN; i++ {
			v := DATA[i]
			med.Filter(v)
		}
		hep = time.Since(start)
		fmt.Println("HeapMedian took:", hep.Microseconds())
	}

	fmt.Printf("Heap/Sort = %.3f%%\n", float64(hep)*100/float64(srt)-100)
}

func main() {
	TestMedianRandom()
	TestMedianPerf()
}

Example Execution¶

In [ ]:
## GO #################################

$ go run cmd/test.go

Median random test passed
SortMedian took: 2977968
HeapMedian took: 1907521
Heap/Sort = -35.946%

## Java ###############################

$ cd java
$ javac SortMedian.java Heap.java HeapMedian.java MedianTest.java
$ java MedianTest

Median random test passed
SortMedian took: 1722689
HeapMedian took: 1559468
Heap/Sort = -9.475%